package per.cyl.log.consumer;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
 * @author 陈玉林
 * @desc TODO
 * @date 2020/6/4 9:04
 */
@Component
public class MessageConsumer {
    @KafkaListener(topics = "kafka_log_topic")
    public void listen (ConsumerRecord<?, ?> record){
        JSONObject jsonObject = (JSONObject)JSON.parse(record.value().toString());
        Message message = new Message();
        message.setLevel(jsonObject.get("level").toString());
        message.setLoggerName(jsonObject.get("loggerName").toString());
        message.setMessage(jsonObject.get("message").toString());
        System.out.println("收到的消息："+message);
    }
}

